使用 Clojure 编写 Hive UDF
Table of Contents
在 Hive 中可以用调用外部脚本的方式处理数据,用外部脚本的好处是不受语言限制,通常情况下我们用Python写一些外部脚本来处理一些数据,但是有些情况下写成 UDF(User-defined function)可以简化查询语句,也能更好地使用条件判断。
Hive 是用 Java 开发的,不过任何运行在 JVM 上的语言,只要支持和 Java 交互就可以写 UDF,选 Clojure 的原因很简单,因为它是 Lisp 方言,也能很好地和 Java 代码交互,并且开发效率很高。
Hive 的自定义函数分了三种:UDTF、UDAF 和 UDF,前两种我基本上用不上。
UDF 编写很简单,只要保证 Hive 能调用你暴露的 evaluate 函数(方法)即可,同时满足:
- 继承 org.apache.hadoop.hive.ql.exec.UDF;
- 调用 UDF 时,保证传入的参数和返回值为 org.apache.hadoop.io.Text 类型(参数和返回值也可以是 String 类型)
本文为 Hive 写一个 upper 函数,将字母转成大写(虽然很无聊,但因为代码量很少,容易让人看懂)
1. 使用 Leiningen 创建项目
关于 Leiningen 是什么、怎么安装,这里不多说了。先创建个项目:
$ lein new upper
然后进入 upper 项目,修改 project.clj,如下:
(defproject upper "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :plugins [[cider/cider-nrepl "0.8.0-SNAPSHOT"]] :dependencies [[org.clojure/clojure "1.5.1"] [hive/hive-exec "0.5.0"] [org.apache.hadoop/hadoop-core "0.20.2-dev"]])
主要改动:
1、增加 Hadoop 和 Hive 的依赖
2、安装 cider/cider-nrepl 插件,便于和 Cider 交互开发
然后自动安装依赖:
$ lein deps
2. 编写 UDF
编辑 src/upper/core.clj,详细请看代码注释:
(ns upper.core (:import [org.apache.hadoop.hive.ql.exec UDF]) (:import [org.apache.hadoop.io Text]) (:gen-class :name upper.core ;; 继承 org.apache.hadoop.hive.ql.exec.UDF,必须的 :extends org.apache.hadoop.hive.ql.exec.UDF ;; 暴露 evaluate 方法给 Hive,同时声明参数类型和返回值类型 ;; 列表里是参数类型,参数数目必须和列表数目对应 ;; 参数和返回值都必须是 org.apache.hadoop.io.Text 类型 :methods [[evaluate [org.apache.hadoop.io.Text] org.apache.hadoop.io.Text]])) (defn upper [string] (.toUpperCase string)) ;; #^Text 是元数据,声明函数返回的是 Text 类型 (defn #^Text -evaluate ;; 声明参数为 Text 类型 [this #^Text string] ;; 先将参数值转成字符串类型处理后,再包装成 Text 类型 (Text. (upper (.toString string))))
这里要注意一个细节,新建 Text 实例时不能给类型为 nil(对应 Java 中的 null)的参数,所以必须保证 upper 函数调用结果是字符串,否则会异常。
3. 编译成 class 文件并打包
首先,需要修改 project.clj,增加 :aot 选项:
:aot [upper.core]
然后,将代码编译成 class。编译时一定要注意 JDK 版本,如果 Hive 用的 Java6,就必须用 Java6 编译:
$ lein compile Compiling upper.core
最后,如果编译顺利通过,再将它打包成 jar 文件:
$ lein uberjar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT.jar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT-standalone.jar
这时会在 target 目录下生成两个 .jar 文件,其中以“-standalone.jar”结尾的 jar 文比较大,应该有 17M 左右,因为它包含了所有的依赖包括 Clojure 自身),所以可以直接放 Hive 上使用。
4. 在 Hive 中调用
为了方便测试,新建一个 hive.sql 文件,把 HiveQL 语句写进去:
-- 将生成的 jar 文件添加到 Hive 中,它会自动分发给其他节点 add jar target/upper-0.1.0-SNAPSHOT-standalone.jar; -- 注册成 Hive 函数 create temporary function my_upper as 'upper.core'; select my_upper(en_name) from user_information;
然后执行:
$ hive -f hive.sql
如果顺利的话,Hive 将正常返回调用结果。
5. 关于调试
开发 UDF 难免会遇到代码错误,可能是编译中出现错误,也可能是运行时出错。编译中出现错误一般就是语法一类的问题,很好解决;但如果是在 Hive 运行时出错,MapReduce 任务会被杀死,调试也会变得比较难,所以在这之前建议写好单元测试,保证各个部分的代码稳定后再放到 Hive 中。如果在 Hive 运行中死掉,可以到 JobTracker 的 Web 监控页面看调试日志中的 Java 异常信息。
6. 关于性能优化
工作中我主要是写 UDF 处理海量日志,在写某个大量匹配功能的 UDF 时,一千万条日志花了 4 小时。但日志里要匹配的字段有大量是重复的,所以我用了 memoize 函数缓存结果,这时一千万条日志只跑了不到半个小时。如果你处理的数据中如果有比较多重复数据的话,建议使用 memoize。
另外,在大规模数据处理下,一些代码细节优化对速度提升很不明显,建议仔细揣摩代码,优先优化流程。像上面说的场景,我在优化了数据处理流程的情况下,最后只花几分钟就可以处理完一千万条日志了。